Java 多线程-阻塞队列相关工具类
阻塞队列的由来
参考资料 第十三章 阻塞队列
假设一种场景,生产者一直生产资源,消费者一直消费资源,资源存储在一个缓冲池中,生产者将生产的资源存进缓冲池中,消费者从缓冲池中拿到资源进行消费,这就是大名鼎鼎的 生产者-消费者模式。
该模式能够简化开发过程,一方面消除了生产者类与消费者类之间的代码依赖性,另一方面将生产数据的过程与使用数据的过程解耦简化负载。
自己实现这个模式的时候,因为需要让多个线程操作共享变量(即资源),所以很容易引发线程安全问题,造成 重复消费和死锁,尤其是生产者和消费者存在多个的情况。另外,当缓冲池空了,我们需要阻塞消费者,唤醒生产者;当缓冲池满了,我们需要阻塞生产者,唤醒消费者,这些个等待-唤醒逻辑都需要自己实现。
这么容易出错的事情,JDK当然帮我们做啦,这就是阻塞队列(BlockingQueue),你只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。
BlockingQueue 是
Java util.concurrent
包下重要的数据结构,区别于普通的队列,BlockingQueue 提供了线程安全的队列访问方式,并发包下很多高级同步类的实现都是基于 BlockingQueue 实现的。
BlockingQueue 的操作方法
阻塞队列提供了四组不同的方法用于插入、移除、检查元素:
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | - | - |
抛出异常:如果试图的操作无法立即执行,抛异常。当阻塞队列满时候,再往队列里插入元素,会抛出 IllegalStateException(“Queue full”)
异常。当队列为空时,从队列里获取元素时会抛出 NoSuchElementException
异常 。
返回特殊值:如果试图的操作无法立即执行,返回一个特殊值,通常是true / false。
一直阻塞:如果试图的操作无法立即执行,则一直阻塞或者响应中断。
超时退出:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功,通常是 true / false。
注意之处
- 不能往阻塞队列中插 入null,会抛出空指针异常。
- 可以访问阻塞队列中的任意元素,调用
remove(o)
可以将队列之中的特定对象移除,但并不高效,尽量避免使用。
BlockingQueue 的实现类
ArrayBlockingQueue
由数组结构组成的有界阻塞队列。内部结构是数组,故具有数组的特性。
public ArrayBlockingQueue(int capacity, boolean fair){
//..省略代码
}
可以初始化队列大小, 且一旦初始化不能改变。构造方法中的 fair 表示控制对象的内部锁是否采用公平锁,默认是 非公平锁。
LinkedBlockingQueue
由链表结构组成的有界阻塞队列。内部结构是链表,具有链表的特性。默认队列的大小是 Integer.MAX_VALUE
,也可以指定大小。此队列按照先进先出的原则对元素进行排序。
DelayQueue
该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。注入其中的元素必须实现 java.util.concurrent.Delayed
接口。
DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
PriorityBlockingQueue
基于优先级的无界阻塞队列(优先级的判断通过构造函数传入的 comparator 对象来决定),内部控制线程同步的锁采用的是非公平锁。
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
this.lock = new ReentrantLock(); //默认构造方法-非公平锁
...//其余代码略
}
PriorityBlockingQueue 不会阻塞数据生产者(因为队列是无界的),而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。对于使用默认大小的 LinkedBlockingQueue 也是一样的。
BlockingQueue 示例和使用场景
生产者-消费者模型
public class Temp {
public static void main(String[] args) {
int queueSize = 10;
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(queueSize);
// 消费者
new Thread(() -> {
while (true) {
try {
queue.take();
System.out.println("从队列取走一个元素,队列剩余" + queue.size() + "个元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "consumer").start();
// 生产者
new Thread(() -> {
while (true) {
try {
queue.put(1);
System.out.println("向队列取中插入一个元素,队列剩余空间:" + (queueSize - queue.size()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "producer").start();
}
}
注意,这个例子中的输出结果看起来可能有问题,比如有几行在插入一个元素之后,队列的剩余空间不变。这是由于 System.out.println
语句没有锁。
考虑到这样的情况:线程1 在执行完 put/take
操作后立即失去 CPU 时间片,然后切换到线程2 执行 put/take
操作,执行完毕后回到线程1 的 System.out.println
语句并输出,发现这个时候阻塞队列的 size 已经被线程2改变了,所以这个时候输出的 size 并不是当时线程1 执行完 put/take
操作之后阻塞队列的 size,但可以确保的是size不会超过10个。
实际上使用阻塞队列是没有问题的。
线程池中使用阻塞队列
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
Java 中的线程池就是使用阻塞队列实现的